【streaming】hadoop-streaming实现

参考:https://hadoop.apache.org/docs/r1.0.4/cn/streaming.html

实质:在集群上执行的map-reduce流程,补充实现hive没有的功能

说明:任何可执行文件都可用作map、reduce文件

小例子:

  • 当只有mapper功能时:针对超大表和小表的join类型的操作,可将小表存成内存文件格式,逐行匹配大表
HADOOP_BIN=`which hadoop`
echo util.sh $HADOOP_BIN
HADOOP=$HADOOP_BIN
mydir=`pwd`

# 退出程序
function myexit() {
    if [ $? != 0 ]
    then
        echo "exit, fatal"
        exit 1
    fi
}
# 中间结果删除
test_file_rmr() {
    ${HADOOP_BIN} fs -test -e $1
    if [ $? -ne "0" ];then
        echo "[i see, hadoop output $1 do not exist]"
    else
        ${HADOOP_BIN} fs -rmr $1
    fi
    myexit
}
# 执行hadoop-streaming流程
base_call_mapper() {
    local out=$2
    test_file_rmr $out
    ${HADOOP_BIN} jar /usr/local/hadoop-2.7.3/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar  \
        -input $1 \
        -output $out \
        -mapper "$3" \
        -reducer None \
        -file $4 \
        -jobconf mapred.output.compress=true \
        -jobconf mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec \
        -jobconf mapred.job.name=$5 \
        -jobconf mapred.map.tasks=$6 \
        -jobconf mapred.job.map.capacity=$6 \
        -jobconf mapred.reduce.tasks=0
    myexit
    ${HADOOP_BIN} fs -dus $out
}
local myfile="$path/data/ad_orentation_configration.txt -file $path/stage1_combine_match.py -file $util -file $path/reduce_index.py"
base_call_mapper "$in_j" "$out_j" "python stage1_combine_match.py" "$myfile" "ad_orentation_cover_user_"$j 800
# input为输入数据在hdfs上的位置
# output为输出数据在hdfs上位置
# $3 为执行mapper的操作,在此mapper为 python stage1_combine_match.py
# $4 为执行时需要把本地文件加载到hdfs上,待加载的文件,在此包含ad_orentation_configration、stage1_combine_match.py reduce_index.py
# $5 $6 均为参数设置,设置mapper、reducer的相关配置,同hive中的配置相同

注:

  1. 可以不包含reducer操作,只需 -reducer None,-jobconf mapred.reduce.tasks=0 即可 2.
  • mapper和reducer均存在
base_call_multi() {
    local out=$2
    test_file_rmr $out
    ${HADOOP_BIN} jar /usr/local/hadoop-2.7.3/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar  \
        -libjars ${mydir}/multiout.jar \
        -input $1 \
        -output $out \
        -mapper "$3" \
        -reducer "$4" \
        -file $5 \
        -outputformat adsfanstop.multiout \
        -jobconf mapred.output.compress=true \
        -jobconf mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec \
        -jobconf mapred.job.name=$6 \
        -jobconf mapred.map.tasks=$7 \
        -jobconf mapred.job.map.capacity=$7 \
        -jobconf mapred.reduce.tasks=$8 \
        -jobconf mapred.job.reduce.capacity=$8
    myexit
    ${HADOOP_BIN} fs -dus $out
}

results matching ""

    No results matching ""